{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Hands-on!\n",
    "\n",
    "Nessa prática, sugerimos alguns pequenos exemplos para você implementar sobre o Spark."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## Logistic Regression com Cross-Validation\n",
    "\n",
    "No exercício `LogisticRegression` foi utilizado `TrainValidationSplit` como abordagem de avaliação do modelo gerado. Atualize o exercício consideram `CrossValidator` e compare os resultados. Não esqueça de utilizar `Pipeline`.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Bibliotecas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression\n",
    "from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.mllib.regression import LabeledPoint\n",
    "from pyspark.ml.linalg import Vectors\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.mllib.evaluation import MulticlassMetrics\n",
    "from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Funções"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def mapLibSVM(row): \n",
    "    return (row[5],Vectors.dense(row[:3]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read \\\n",
    "        .format(\"csv\") \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .option(\"inferSchema\", \"true\") \\\n",
    "        .load(\"datasets/iris.data\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convertendo a saída de categórica para numérica"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-----------+------------+-----------+-----------+----------+\n",
      "|sepal_length|sepal_width|petal_length|petal_width|      label|labelIndex|\n",
      "+------------+-----------+------------+-----------+-----------+----------+\n",
      "|         5.1|        3.5|         1.4|        0.2|Iris-setosa|       0.0|\n",
      "|         4.9|        3.0|         1.4|        0.2|Iris-setosa|       0.0|\n",
      "|         4.7|        3.2|         1.3|        0.2|Iris-setosa|       0.0|\n",
      "|         4.6|        3.1|         1.5|        0.2|Iris-setosa|       0.0|\n",
      "|         5.0|        3.6|         1.4|        0.2|Iris-setosa|       0.0|\n",
      "|         5.4|        3.9|         1.7|        0.4|Iris-setosa|       0.0|\n",
      "|         4.6|        3.4|         1.4|        0.3|Iris-setosa|       0.0|\n",
      "|         5.0|        3.4|         1.5|        0.2|Iris-setosa|       0.0|\n",
      "|         4.4|        2.9|         1.4|        0.2|Iris-setosa|       0.0|\n",
      "|         4.9|        3.1|         1.5|        0.1|Iris-setosa|       0.0|\n",
      "|         5.4|        3.7|         1.5|        0.2|Iris-setosa|       0.0|\n",
      "|         4.8|        3.4|         1.6|        0.2|Iris-setosa|       0.0|\n",
      "|         4.8|        3.0|         1.4|        0.1|Iris-setosa|       0.0|\n",
      "|         4.3|        3.0|         1.1|        0.1|Iris-setosa|       0.0|\n",
      "|         5.8|        4.0|         1.2|        0.2|Iris-setosa|       0.0|\n",
      "|         5.7|        4.4|         1.5|        0.4|Iris-setosa|       0.0|\n",
      "|         5.4|        3.9|         1.3|        0.4|Iris-setosa|       0.0|\n",
      "|         5.1|        3.5|         1.4|        0.3|Iris-setosa|       0.0|\n",
      "|         5.7|        3.8|         1.7|        0.3|Iris-setosa|       0.0|\n",
      "|         5.1|        3.8|         1.5|        0.3|Iris-setosa|       0.0|\n",
      "+------------+-----------+------------+-----------+-----------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "indexer = StringIndexer(inputCol=\"label\", outputCol=\"labelIndex\")\n",
    "indexer = indexer.fit(df).transform(df)\n",
    "indexer.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-------------+\n",
      "|label|     features|\n",
      "+-----+-------------+\n",
      "|  0.0|[5.1,3.5,1.4]|\n",
      "|  0.0|[4.9,3.0,1.4]|\n",
      "|  0.0|[4.7,3.2,1.3]|\n",
      "|  0.0|[4.6,3.1,1.5]|\n",
      "|  0.0|[5.0,3.6,1.4]|\n",
      "|  0.0|[5.4,3.9,1.7]|\n",
      "|  0.0|[4.6,3.4,1.4]|\n",
      "|  0.0|[5.0,3.4,1.5]|\n",
      "|  0.0|[4.4,2.9,1.4]|\n",
      "|  0.0|[4.9,3.1,1.5]|\n",
      "|  0.0|[5.4,3.7,1.5]|\n",
      "|  0.0|[4.8,3.4,1.6]|\n",
      "|  0.0|[4.8,3.0,1.4]|\n",
      "|  0.0|[4.3,3.0,1.1]|\n",
      "|  0.0|[5.8,4.0,1.2]|\n",
      "|  0.0|[5.7,4.4,1.5]|\n",
      "|  0.0|[5.4,3.9,1.3]|\n",
      "|  0.0|[5.1,3.5,1.4]|\n",
      "|  0.0|[5.7,3.8,1.7]|\n",
      "|  0.0|[5.1,3.8,1.5]|\n",
      "+-----+-------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "dfLabeled = indexer.rdd.map(mapLibSVM).toDF([\"label\", \"features\"])\n",
    "dfLabeled.show()\n",
    "\n",
    "train, test = dfLabeled.randomSplit([0.9, 0.1], seed=12345)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Definição do Modelo Logístico"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "lr = LogisticRegression(labelCol=\"label\", maxIter=15)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cross-Validation - TrainValidationSplit e CrossValidator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "paramGrid = ParamGridBuilder()\\\n",
    "    .addGrid(lr.regParam, [0.1, 0.001]) \\\n",
    "    .build()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "tvs = TrainValidationSplit(estimator=lr,\n",
    "                           estimatorParamMaps=paramGrid,\n",
    "                           evaluator=MulticlassClassificationEvaluator(),\n",
    "                           trainRatio=0.8)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cval = CrossValidator(estimator=lr,\n",
    "                      estimatorParamMaps=paramGrid,\n",
    "                      evaluator=MulticlassClassificationEvaluator(),\n",
    "                      numFolds=10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Treino do Modelo e Predição do Teste"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "result_tvs = tvs.fit(train).transform(test)\n",
    "result_cval = cval.fit(train).transform(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "preds_tvs = result_tvs.select([\"prediction\", \"label\"])\n",
    "preds_cval = result_cval.select([\"prediction\", \"label\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Avaliação dos Modelos"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Instânciação dos Objetos de Métrics\n",
    "metrics_tvs = MulticlassMetrics(preds_tvs.rdd)\n",
    "metrics_cval = MulticlassMetrics(preds_cval.rdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Summary Stats\n",
      "/home/minhotmog/spark-2.2.0-bin-hadoop2.7/python/pyspark/mllib/evaluation.py:262: UserWarning: Deprecated in 2.0.0. Use accuracy.\n",
      "  warnings.warn(\"Deprecated in 2.0.0. Use accuracy.\")\n",
      "F1 Score = 0.9090909090909091\n",
      "Accuracy = 0.9090909090909091\n",
      "Weighted recall = 0.9090909090909092\n",
      "Weighted precision = 0.9242424242424243\n",
      "Weighted F(1) Score = 0.8980716253443526\n",
      "Weighted F(0.5) Score = 0.9070010449320796\n",
      "Weighted false positive rate = 0.07575757575757575\n"
     ]
    }
   ],
   "source": [
    "# Estatísticas Gerais para o Método TrainValidationSplit\n",
    "print(\"Summary Stats\")\n",
    "print(\"F1 Score = %s\" % metrics_tvs.fMeasure())\n",
    "print(\"Accuracy = %s\" % metrics_tvs.accuracy)\n",
    "print(\"Weighted recall = %s\" % metrics_tvs.weightedRecall)\n",
    "print(\"Weighted precision = %s\" % metrics_tvs.weightedPrecision)\n",
    "print(\"Weighted F(1) Score = %s\" % metrics_tvs.weightedFMeasure())\n",
    "print(\"Weighted F(0.5) Score = %s\" % metrics_tvs.weightedFMeasure(beta=0.5))\n",
    "print(\"Weighted false positive rate = %s\" % metrics_tvs.weightedFalsePositiveRate)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Summary Stats\n",
      "F1 Score = 0.9090909090909091\n",
      "Accuracy = 0.9090909090909091\n",
      "Weighted recall = 0.9090909090909092\n",
      "Weighted precision = 0.9242424242424243\n",
      "Weighted F(1) Score = 0.8980716253443526\n",
      "Weighted F(0.5) Score = 0.9070010449320796\n",
      "Weighted false positive rate = 0.07575757575757575\n"
     ]
    }
   ],
   "source": [
    "# Estatísticas Gerais para o Método TrainValidationSplit\n",
    "print(\"Summary Stats\")\n",
    "print(\"F1 Score = %s\" % metrics_cval.fMeasure())\n",
    "print(\"Accuracy = %s\" % metrics_cval.accuracy)\n",
    "print(\"Weighted recall = %s\" % metrics_cval.weightedRecall)\n",
    "print(\"Weighted precision = %s\" % metrics_cval.weightedPrecision)\n",
    "print(\"Weighted F(1) Score = %s\" % metrics_cval.weightedFMeasure())\n",
    "print(\"Weighted F(0.5) Score = %s\" % metrics_cval.weightedFMeasure(beta=0.5))\n",
    "print(\"Weighted false positive rate = %s\" % metrics_cval.weightedFalsePositiveRate)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Conclusão:\n",
    "\n",
    "Uma vez que ambos os modelos de CrossValidation usam o mesmo modelo de predição (a Regressão Logística), e contando com o fato de que o dataset é relativamente pequeno, é natural que ambos os métodos de CrossValidation encontrem o mesmo (ou aproximadamente igual) valor ótimo para os hyperparâmetros testados.\n",
    "\n",
    "Por esse motivo, após descobrirem esse valor de hiperparâmetros, os dois modelos irão demonstrar resultados bastante similiares quando avaliados sobre o Conjunto de Treino (que também é o mesmo para os dois modelos).\n",
    "\n",
    "-------------------------------"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Random Forest\n",
    "\n",
    "Use o exercício anterior como base, mas agora utilizando `pyspark.ml.classification.RandomForestClassifier`. Use `Pipeline` e `CrossValidator` para avaliar o modelo gerado."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "### Bibliotecas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import RandomForestClassifier"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Definição do Modelo de Árvores Randômicas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "rf = RandomForestClassifier(labelCol=\"label\", featuresCol=\"features\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cross-Validation - CrossValidator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "paramGrid = ParamGridBuilder()\\\n",
    "    .addGrid(rf.numTrees, [1, 100]) \\\n",
    "    .build()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "cval = CrossValidator(estimator=rf,\n",
    "                      estimatorParamMaps=paramGrid,\n",
    "                      evaluator=MulticlassClassificationEvaluator(),\n",
    "                      numFolds=10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Treino do Modelo e Predição do Teste"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "results = cval.fit(train).transform(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "predictions = results.select([\"prediction\", \"label\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Avaliação do Modelo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Summary Stats\n",
      "F1 Score = 1.0\n",
      "Accuracy = 1.0\n",
      "Weighted recall = 1.0\n",
      "Weighted precision = 1.0\n",
      "Weighted F(1) Score = 1.0\n",
      "Weighted F(0.5) Score = 1.0\n",
      "Weighted false positive rate = 0.0\n"
     ]
    }
   ],
   "source": [
    "# Instânciação dos Objetos de Métrics\n",
    "metrics = MulticlassMetrics(predictions.rdd)\n",
    "\n",
    "# Estatísticas Gerais para o Método TrainValidationSplit\n",
    "print(\"Summary Stats\")\n",
    "print(\"F1 Score = %s\" % metrics.fMeasure())\n",
    "print(\"Accuracy = %s\" % metrics.accuracy)\n",
    "print(\"Weighted recall = %s\" % metrics.weightedRecall)\n",
    "print(\"Weighted precision = %s\" % metrics.weightedPrecision)\n",
    "print(\"Weighted F(1) Score = %s\" % metrics.weightedFMeasure())\n",
    "print(\"Weighted F(0.5) Score = %s\" % metrics.weightedFMeasure(beta=0.5))\n",
    "print(\"Weighted false positive rate = %s\" % metrics.weightedFalsePositiveRate)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Conclusão:\n",
    "\n",
    "Uma vez que o RandomForest é um classificador relatiamente robusto, e o Iris é um problema relativamente simples, é notável que esse modelo é suficientemente capaz de perfeitamente predizer observações desse dataset."
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Apache Toree - PySpark",
   "language": "python",
   "name": "apache_toree_pyspark"
  },
  "language_info": {
   "codemirror_mode": "text/x-ipython",
   "file_extension": ".py",
   "mimetype": "text/x-ipython",
   "name": "python",
   "pygments_lexer": "python",
   "version": "3.5.2\n"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
